# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tools related to i/o in HySoP.
.. currentmodule hysop.tools.io_utils
* :class:`~IO`
* :class:`~IOParams`
* :class:`~Writer`
* :class:`~XMF`, tools to prepare/write xmf files.
"""
import os
import sys
import psutil
import warnings
import tempfile
import socket
import shutil
import atexit
import subprocess
import numpy as np
from collections import namedtuple
from inspect import getouterframes, currentframe
from re import findall
from hysop.constants import DirectionLabels
from hysop.tools.htypes import first_not_None, check_instance
from hysop.tools.parameters import MPIParams
from hysop.tools.warning import HysopWarning
from hysop.tools.decorators import requires_cmd
import hysop.core.mpi as mpi
[docs]
class IO:
"""
Static class with utilities to set/find the place where i/o files
will be read/written.
"""
_default_path = None
_cache_path = None
_tmp_dirs = {}
HDF5 = 998
"""HDF5 format id"""
ASCII = 997
"""ascii format id"""
[docs]
@staticmethod
@requires_cmd("stat")
def get_fs_type(path):
cmd = ["stat", "-f", "-c", "%T", path]
fs_type = ""
if mpi.main_rank == 0:
fs_type = subprocess.check_output(cmd).decode("utf-8")
fs_type = mpi.main_comm.bcast(fs_type, root=0)
return fs_type.replace("\n", "")
[docs]
@classmethod
def is_shared_fs(cls, path):
return cls.get_fs_type(path) in ["nfs"]
[docs]
@classmethod
def default_path(cls):
"""Get the current default path used for io.
Returns
-------
string
the default value of the current i/o path.
"""
assert cls._default_path is not None, "default path has not been set."
return cls._default_path
[docs]
@classmethod
def default_ram_path(cls):
"""Get the current default path used for io in memory.
Returns
-------
string
the default value of the current RAM i/o path.
"""
try:
import memory_tempfile
except ImportError as e:
print
print(e)
print
msg = "You are trying to use a RAM filesystem but the 'mempory_tempfile' is not present on your system."
msg += "Get it from https://gitlab.com/keckj/memory-tempfile."
raise RuntimeError(msg)
mt = memory_tempfile.MemoryTempfile(fallback=True)
if mt.found_mem_tempdir():
return mt.gettempdir()
else:
return None
[docs]
@staticmethod
def check_dir(filepath, io_rank=0, comm=None):
"""Check if the directory of 'filename' exists and creates it if not.
Parameters
-----------
filepath : string
directory path with full or relative path
io_rank : int
processus rank that does the check.
comm : mpi communicator
the mpi communicator that does the check.
"""
# Create output dir if required
if comm is None:
comm = mpi.main_comm
if comm.Get_rank() == io_rank:
d = filepath
if not os.path.exists(d):
os.makedirs(d)
[docs]
@staticmethod
def set_default_path(pathdir):
"""Set a new default path for hysop i/o.
Parameters
-----------
pathdir : string
the new path
"""
assert isinstance(pathdir, str)
IO._default_path = pathdir
IO.check_dir(IO._default_path)
[docs]
@classmethod
def default_cache_path(cls):
from hysop import get_env
home = os.path.expanduser("~")
tmp = tempfile.gettempdir()
candidates = [get_env("CACHE_DIR", None), f"{home}/.cache", home, f"{tmp}"]
cpath = None
for c in candidates:
if c is None:
continue
elif c == home:
cpath = f"{home}/.hysop"
else:
cpath = f"{c}/hysop"
cpath += f"/{socket.gethostname()}"
break
if cpath is None:
msg = "No suitable caching directory was found in {}."
msg = msg.format(candidates)
raise RuntimeError(msg)
cpath = f"{cpath}/python{sys.version_info.major}_{sys.version_info.minor}"
if not os.path.exists(cpath):
try:
if mpi.main_rank == 0:
os.makedirs(cpath)
except OSError:
pass
return cpath
[docs]
@staticmethod
def cache_path():
if IO._cache_path is None:
IO.set_cache_path(IO.default_cache_path())
return IO._cache_path
[docs]
@classmethod
def ram_path(cls):
return cls.default_ram_path()
[docs]
@classmethod
def get_tmp_dir(cls, key):
"""
Create or get an existing temporary directory.
"""
if key in cls._tmp_dirs:
tmp_dir = cls._tmp_dirs[key]
else:
tmp_dir = tempfile.mkdtemp()
cls._tmp_dirs[key] = tmp_dir
return tmp_dir
@classmethod
def _remove_tmp_dirs(cls):
for f in cls._tmp_dirs.values():
shutil.rmtree(f, ignore_errors=True, onerror=None)
[docs]
@classmethod
def set_cache_path(cls, path):
if cls.is_shared_fs(path):
hostname = socket.gethostname()
if hostname not in path:
new_path = f"{path}/{hostname}"
msg = "\nSpecified cache path '{}' is stored on a network filesystem "
msg += "which does not correctly support file locking."
msg += "\nSetting cache_path to '{}'."
msg = msg.format(path, new_path)
warnings.warn(msg, HysopWarning)
path = new_path
IO._cache_path = path
IO.check_dir(path)
[docs]
@staticmethod
def set_datasetname(field_name, topo, direction=None):
"""Return the dataset name of a given continuous field,
saved for a given topology
"""
val = field_name + "_" + str(topo.get_id())
if direction is not None:
val += DirectionLabels[direction]
return val
[docs]
@staticmethod
def get_datasetnames(filename):
"""Return the list of dataset names present
in hdf input file
Parameters
----------
filename : string
hdf file
Returns
-------
a list of strings
"""
import h5py
hdf_file = h5py.File(filename, "r")
keys = hdf_file.keys()
hdf_file.close()
return keys
[docs]
class IOParams(
namedtuple(
"IOParams",
[
"filename",
"filepath",
"frequency",
"fileformat",
"dump_times_fp32",
"dump_times_fp64",
"dump_tstart",
"dump_tend",
"dump_func",
"io_leader",
"visu_leader",
"with_last",
"enable_ram_fs",
"force_ram_fs",
"dump_is_temporary",
"postprocess_dump",
"append",
"hdf5_disable_compression",
"hdf5_disable_slicing",
"disk_filepath",
"kwds",
],
)
):
"""
A struct to handle I/O files parameters
Parameters
-----------
filename : string
Name of the file (absolute or relative path)
filepath : string
Location of the file
frequency : int
Frequency of output or input (e.g. every N times steps)
fileformat : int
Format of the file. See notes for available format. Default=HDF5.
dump_times: tuple of floats
Extra dump times that should be used to dump in addition to frequency (double precision)
dump_tstart: float
Start to dump at given time. Defaults to -np.inf (no time constraints).
dump_tend: float
Stop to dump at given time. Defaults to +np.inf (no time constraints).
dump_func: function
Generic function to compute the should_dump result.
with_last: boolean
should dump when iteration is last one
io_leader : int
Rank of the mpi process dealing with the io. Default is 0.
visu_leader : int
Rank of the mpi process dealing with the graphical io. Default is 0.
enable_ram_fs: bool
Instruct the dumper to write directly to RAM, fallback to filepath/filename when this is not possible.
force_ram_fs: bool
Force the dumper to write directly to RAM, and raise an error when this is not possible (filepath/filename are ignored).
Implies enable_ram_fs.
dump_is_temporary: bool
Instruct the dumper to delete dumped data from disk or RAM after postprocessing script has been called.
Implies that a postprocessing script is supplied.
postprocess_dump: str
Path to a postprocessing script that will be called after dump.
See hysop/tools/postprocess_dump.sh for an example of post processing script.
hdf5_disable_compression: bool
Disable compression for HDF5 outputs (when available).
Can be used to accelerate in RAM postprocessing.
hdf5_disable_slicing: bool
Disable slicing for HDF5 outputs (when available).
May reduce performance but avoid hdf5 file fragmentation.
append : bool, optional
Tell if appended (on xmf files, when using hdf format)
kwds: dict
Custom extra keyword arguments to pass to operators
See examples in hysop.operator.hdf_io
Notes
-----
Format parameter must be one of the following :
- :class:`~IO.HDF5`
- :class:`~IO.ASCII`
"""
def __new__(
cls,
filename,
filepath=None,
frequency=1,
fileformat=None,
dump_times=None,
dump_tstart=None,
dump_tend=None,
dump_func=None,
io_leader=0,
visu_leader=0,
with_last=False,
enable_ram_fs=False,
force_ram_fs=False,
dump_is_temporary=False,
postprocess_dump=None,
hdf5_disable_compression=False,
hdf5_disable_slicing=False,
append=False,
**kwds,
):
dump_tstart = first_not_None(dump_tstart, -np.inf)
dump_tend = first_not_None(dump_tend, +np.inf)
fileformat = first_not_None(fileformat, IO.HDF5)
dump_times = first_not_None(dump_times, ())
check_instance(filename, str, allow_none=True)
check_instance(filepath, str, allow_none=True)
check_instance(frequency, int)
check_instance(dump_times, tuple, values=(float, np.float64))
check_instance(dump_tstart, (int, float, np.float64))
check_instance(dump_tend, (int, float, np.float64))
check_instance(io_leader, int)
check_instance(visu_leader, int)
check_instance(with_last, bool)
check_instance(enable_ram_fs, bool)
check_instance(force_ram_fs, bool)
check_instance(dump_is_temporary, bool)
check_instance(postprocess_dump, str, allow_none=True)
check_instance(hdf5_disable_compression, bool)
check_instance(hdf5_disable_slicing, bool)
check_instance(append, bool)
if dump_func:
assert callable(dump_func), "given function must be callable"
assert (
dump_func.__code__.co_argcount
), "given function must take one arg (as simulation object)"
frequency = int(frequency)
dump_tstart = float(dump_tstart)
dump_tend = float(dump_tend)
io_leader = int(io_leader)
visu_leader = int(visu_leader)
dump_times_fp64 = tuple(map(np.float64, dump_times))
dump_times_fp32 = tuple(map(np.float32, dump_times))
if force_ram_fs:
enable_ram_fs = True
try:
ram_path = IO.ram_path()
except RuntimeError:
if force_ram_fs:
raise
else:
ram_path = None
disk_filepath = None
if enable_ram_fs and (ram_path is not None):
if filename:
assert not os.path.isabs(filename), filename
disk_filepath = filepath
filepath = ram_path
# Filename is absolute path, filepath arg is ignored.
if filename:
if os.path.isabs(filename):
filepath = os.path.dirname(filename)
else:
if filepath is not None:
filename = os.path.join(filepath, filename)
filepath = os.path.abspath(os.path.dirname(filename))
else:
filepath = os.path.dirname(filename)
if filepath == "":
# Get default output path
filepath = IO.default_path()
filename = os.path.join(filepath, filename)
else:
filepath = os.path.abspath(filepath)
filename = os.path.join(filepath, os.path.basename(filename))
elif filepath:
filepath = os.path.abspath(filepath)
else:
filepath = IO.default_path()
IO.check_dir(filepath)
if disk_filepath is None:
disk_filepath = filepath
if dump_is_temporary:
msg = "Dump is temporary but no postprocessing script has been supplied"
assert postprocess_dump is not None, msg
return super().__new__(
cls,
filename,
filepath,
frequency,
fileformat,
dump_times_fp32,
dump_times_fp64,
dump_tstart,
dump_tend,
dump_func,
io_leader,
visu_leader,
with_last,
enable_ram_fs,
force_ram_fs,
dump_is_temporary,
postprocess_dump,
append,
hdf5_disable_compression,
hdf5_disable_slicing,
disk_filepath,
kwds,
)
[docs]
def should_dump(self, simulation):
if self.dump_func is not None:
return self.dump_func(simulation)
frequency = self.frequency
t = simulation.t()
dump = (frequency >= 0) and (self.with_last and simulation._next_is_last)
if t > self.dump_tend + simulation.tol:
return dump
if (frequency >= 0) and simulation.is_time_of_interest:
if isinstance(t, np.float32):
dump |= t in self.dump_times_fp32
elif isinstance(t, np.float64):
dump |= t in self.dump_times_fp64
else:
raise NotImplementedError(type(t))
if frequency > 0:
dump |= (simulation.current_iteration % frequency) == 0
return dump
[docs]
def clone(self, **kwds):
keys = (
"filename",
"frequency",
"fileformat",
"dump_times",
"dump_tstart",
"dump_tend",
"dump_func",
"io_leader",
"visu_leader",
"with_last",
"enable_ram_fs",
"force_ram_fs",
"dump_is_temporary",
"postprocess_dump",
"hdf5_disable_compression",
"hdf5_disable_slicing",
"append",
"kwds",
)
diff = set(kwds.keys()).difference(keys)
if diff:
msg = f"Unknown parameters {diff} for class {self.__class__.__name__}."
raise ValueError(msg)
all_kwds = {}
for k in keys:
if k == "kwds":
for k, v in kwds.get(k, getattr(self, k)).items():
all_kwds[k] = v
else:
all_kwds[k] = kwds.get(k, getattr(self, k))
all_kwds["filepath"] = kwds.get("filepath", getattr(self, "disk_filepath"))
return IOParams(**all_kwds)
@property
def dump_times(self):
return self.dump_times_fp64
def __str__(self):
return self.to_string()
[docs]
def to_string(self, prefix=""):
ss = """filename: {}
filepath: {}
fileformat: {}
frequency: {}
dump_times: {}
dump_tstart: {}
dump_tend: {}
dump_func: {}
io_leader: {}
visu_leader: {}
enable_ram_fs: {}
force_ram_fs: {}
dump_is_tmp: {}
post_process: {}
hdf5_no_compr: {}
hdf5_no_slice: {}
append: {}
extra_kwds: {}""".format(
self.filename,
self.filepath,
self.fileformat,
self.frequency,
self.dump_times,
self.dump_tstart,
self.dump_tend,
self.dump_func,
self.io_leader,
self.visu_leader,
self.enable_ram_fs,
self.force_ram_fs,
self.dump_is_temporary,
self.postprocess_dump,
self.hdf5_disable_compression,
self.hdf5_disable_slicing,
self.append,
self.kwds,
)
return prefix + ("\n" + prefix).join(ss.split("\n"))
[docs]
class Writer:
"""
To write data from a 2D numpy array into an ascii file.
Examples
--------
>>> from hysop.tools.io_utils import IOParams, IO, Writer
>>> params = IOParams(filename='r.dat', fileformat=IO.ASCII)
>>> wr = Writer(params, buffshape=(1, 2))
>>> ite = 3 # current iteration number
>>> if wr.do_write(ite):
... wr.buffer[...] = 3.
... wr.write()
>>> wr.finalize()
result : buffer is written into r.dat
"""
def __init__(self, io_params, buffshape=None, mpi_params=None, safe_io=True):
"""
Parameters
----------
io_params : hysop.tools.io_utils.IOParams
Setup for file ouput (name, location ...)
buffshape : tuple
2D numpy.array.shape like tuple, shape of the output/input buffer.
mpi_params : hysop.tools.parameters.MPIParams
Mpi setup (comm that owns the writer)
safe_io : boolean
True --> open/close file everytime data are written.
False --> open at init and close during finalize.
Cost less but if simu crashes, data are lost.
"""
# Absolute path + name for i/o file
# Note that if filename contains absolute path
# filepath is ignored
msg = "wrong type for io_params arg."
assert isinstance(io_params, IOParams), msg
assert io_params.fileformat == IO.ASCII
self.io_params = io_params
# A reference communicator, just to identify a
# process rank for io.
if mpi_params is None:
mpi_params = MPIParams()
else:
msg = "wrong type for mpi_params arg."
assert isinstance(mpi_params, MPIParams), msg
self.mpi_params = mpi_params
# check if output dir exists, create it if not.
IO.check_dir(
self.io_params.filepath, self.io_params.io_leader, self.mpi_params.comm
)
# Shape of the output buffer (must be a 2D numpy array)
if buffshape is None:
buffshape = (1, 1)
self._buffshape = buffshape
assert (
len(self._buffshape) == 2
), "2D shape required : set arg buffshape as a 2D tuple: (x,y)"
# The buffer (numpy array) that will be printed to a file
from hysop.tools.numpywrappers import npw
self.buffer = npw.zeros(self._buffshape)
" buffer used to save printed data"
# Defines how often
# output file is written :
# True --> open/close file everytime
# data are written.
# False --> open at init and close
# during finalize. Cost less but if simu
# crashes, data are lost.
if safe_io:
self.write = self._fullwrite
else:
self.write = self._partialwrite
# Force synchro to be sure that all output dirs
# have been created.
self.mpi_params.comm.barrier()
if self.mpi_params.rank == self.io_params.io_leader:
self._file = open(self.io_params.filename, "w")
[docs]
def do_write(self, ite):
"""Returns true if output is required
for iteration ite
Parameters
----------
ite : int
current iteration number
"""
num = ite + 1 # Number of iterations done
rk = self.mpi_params.rank
return rk == self.io_params.io_leader and (num % self.io_params.frequency) == 0
def _ft_write(self):
"""Write a two-dim. NumPy array a in tabular form to fileobj."""
# Function taken from scitools
# fastest version (of the write family of functions) so far...
# written by Mario Pernici <Mario.Pernici@mi.infn.it>
fileobj, a = self._file, self.buffer
if len(a.shape) != 2:
raise TypeError("a 2D array is required, shape now is " + str(a.shape))
N = 512
shape0 = a.shape[0]
shape1 = a.shape[1]
str_fmt = "%g\t" * (shape1 - 1) + "%g\n"
# use a big format string
str_fmt_N = str_fmt * N
for i in range(shape0 // N):
a1 = a[i : i + N, :]
# put a1 in 1D array form; ravel better than reshape for
# non-contiguous arrays.
a1 = np.ravel(a1)
fileobj.write(str_fmt_N % tuple(a1))
for i in range(shape0 - shape0 % N, shape0):
fileobj.write(str_fmt % tuple(a[i]))
def _fullwrite(self):
"""open, write and close"""
# import scitools.filetable as ft
self._file = open(self.io_params.filename, "a")
# ft.write(self._file, self.buffer)
self._ft_write()
self._file.close()
def _partialwrite(self):
"""just write, no open, nor close"""
# import scitools.filetable as ft
# ft.write(self._file, self.buffer)
self._ft_write()
[docs]
def finalize(self):
"""close, if required"""
if self.mpi_params.rank == self.io_params.io_leader:
if not self._file.closed:
self._file.close()
def __str__(self):
if self.mpi_params.rank == self.io_params.io_leader:
s = " === Writer === \n"
s += " - filename = " + self.io_params.filename
s += "\n - buffshape = " + str(self._buffshape)
s += "\n - frequ = " + str(self.io_params.frequency)
return s
[docs]
class XMF:
"""Static class - Tools to prepare and write xmf file"""
@staticmethod
def _list_format(l):
"""Format a list to the xml output.
Removes the '[]()' and replace ',' with ' ' in default str.
Parameters
----------
l : list to format
"""
buff = str(l).replace(",", " ").replace("[", "")
return buff.replace("]", "").replace("(", "").replace(")", "")
[docs]
@staticmethod
def prepare_grid_attributes(
dataset_names, resolution, origin, step, joinrkfiles=None
):
"""
Prepare XDMF header as a string.
Parameters
-----------
dataset_names : list
all datasets names
resolution: 3d tuple
origin: 3d tuple
step: 3d tuple
joinrkfiles : (optional)
Returns:
--------
string
the xml-like header formattable with the following keywords:
niteration : iteration number
time: time in seconds
filename: target file name, in sequential or with parallel hdf5 support
filename0, ... filenameN : target file names for each rank 0 to N, in parallel without HDF5 parallel support
resolution0, ... resolutionN : local resolutions for each rank 0 to N, in parallel without HDF5 parallel support
"""
# The header (xml-like), saved in a string.
# always use a 3D mesh because paraview does not like 2D meshes (uses axe (Y,Z) instead of (X,Y)).
xml_grid = ""
topo_type = "3DCORECTMesh"
geo_type = "ORIGIN_DXDYDZ"
xml_grid += ' <Grid Name="Iteration {}"'.format("{niteration:06d}")
xml_grid += ' GridType="Uniform">\n'
xml_grid += ' <Time Value="{}" />\n'.format("{time}")
xml_grid += ' <Topology TopologyType="' + str(topo_type) + '"'
xml_grid += ' NumberOfElements="'
xml_grid += XMF._list_format(resolution) + ' "/>\n'
xml_grid += ' <Geometry GeometryType="' + geo_type + '">\n'
xml_grid += ' <DataItem Dimensions="' + str(3) + ' "'
xml_grid += ' NumberType="Float" Precision="8" Format="XML">\n'
xml_grid += " " + XMF._list_format(origin) + "\n"
xml_grid += " </DataItem>\n"
xml_grid += ' <DataItem Dimensions="' + str(3) + ' "'
xml_grid += ' NumberType="Float" Precision="8" Format="XML">\n'
xml_grid += " " + XMF._list_format(step) + "\n"
xml_grid += " </DataItem>\n"
xml_grid += " </Geometry>\n"
# Append dataset parameters
for name in dataset_names:
xml_grid += ' <Attribute Name="'
xml_grid += name + '"'
xml_grid += ' AttributeType="Scalar" Center="Node">\n'
if joinrkfiles is None:
xml_grid += ' <DataItem Dimensions="'
xml_grid += XMF._list_format(resolution) + ' "'
xml_grid += ' NumberType="Float" Precision="8" Format="HDF"'
xml_grid += ' Compression="Raw">\n' #
xml_grid += " {filename}"
xml_grid += ":/" + name
xml_grid += "\n </DataItem>\n"
else:
xml_grid += ' <DataItem Dimensions="'
xml_grid += XMF._list_format(resolution) + ' "'
xml_grid += ' ItemType="Function" Function="JOIN('
xml_grid += " ; ".join("$" + str(i) for i in joinrkfiles)
xml_grid += ')">\n'
for i in joinrkfiles:
xml_grid += ' <DataItem Dimensions="'
xml_grid += "{resolution" + str(i) + "}" + ' "'
xml_grid += ' NumberType="Float" Precision="8" Format="HDF"'
xml_grid += ' Compression="Raw">\n' #
xml_grid += " {filename" + str(i) + "}"
xml_grid += ":/" + name
xml_grid += "\n </DataItem>\n"
xml_grid += " </DataItem>\n"
xml_grid += " </Attribute>\n"
xml_grid += " </Grid>\n"
return xml_grid
atexit.register(IO._remove_tmp_dirs)